22. 实践课-视觉检测节点开发

视觉检测节点开发

关联:索引

  1. 场景提问:分拣产线相机拍到苹果,系统要把“苹果在哪、是什么等级、有无缺陷”变成可发布的数据,你觉得中间要经历哪些步骤?

1)目标检测输出到底是什么?

目标检测(Detection)的最小输出集合通常包含:

工程上要牢记:检测模型输出的是“概率意义上的候选”,不是“百分百真实”。所以后处理要做阈值、去重、规则解释与验证。

2)YOLOv8 推理链路(从输入到框)

用一句话描述 YOLOv8(面向工程实现):

关键点(与节点开发相关):

  1. 数据输入层:订阅 sensor_msgs/msg/Image(相机话题)。
  2. 推理层:调用 YOLOv8 推理,得到候选框。
  3. 后处理层:把检测框变成“业务可解释的指标”(果径、着色、缺陷)与最终等级。
  4. 输出层:封装成 ROS2 消息并发布(至少发布 2 路:原始检测结果 + 品质结果)。

本讲选择的“通用检测消息类型”是 vision_msgs/msg/Detection2DArray(更通用、便于其它节点复用)。其结构参考官方定义:header + detections[],每个 detection 里包含 bbox + results[](类别与置信度)等字段(见 ROS2 Humble 文档:Detection2D 与 Detection2DArray)。

  1. 快问快答:视觉检测节点最少要订阅什么、发布什么?为什么我们要同时发布“原始检测结果”和“业务品质结果”?

1)安装 ROS 侧依赖(Ubuntu 22.04 + Humble)

sudo apt update
sudo apt install -y \
  ros-humble-cv-bridge \
  ros-humble-image-transport \
  ros-humble-vision-msgs

解释与自检:

2)安装 YOLOv8(Ultralytics)Python 依赖(推荐方案:venv 固化环境)

本讲推荐用 venv 的原因:避免系统 Python 依赖冲突,且可把“能跑的环境”作为交付的一部分。

python3 -m venv --system-site-packages ~/venvs/yolo_ros2
source ~/venvs/yolo_ros2/bin/activate
python -m pip install -U pip
python -m pip install ultralytics opencv-python

解释与自检:

python -c "import ultralytics; import cv2; print('ultralytics=', ultralytics.__version__); print('opencv=', cv2.__version__)"
source /opt/ros/humble/setup.bash
python -c "import rclpy; import cv_bridge; print('rclpy and cv_bridge ok')"
python - <<'PY'
from ultralytics import YOLO
m = YOLO('yolov8n.pt')
print('model loaded')
PY

本讲的“品质结果”不建议强行塞进 Detection2DArray,原因是:品质字段(果径/着色/等级)属于业务扩展,应该和通用检测结果解耦,便于其它项目复用通用消息。

1)创建接口包(示例:apple_quality_msgs

cd ~/ros2_ws/src
ros2 pkg create apple_quality_msgs --build-type ament_cmake

解释与自检:

2)定义消息文件(两种:单个苹果 + 数组)

apple_quality_msgs/msg/AppleQuality.msg

std_msgs/Header header
string id
float32 center_x
float32 center_y
float32 size_x
float32 size_y
float32 diameter_mm
float32 red_ratio
float32 defect_score
string grade

apple_quality_msgs/msg/AppleQualityArray.msg

std_msgs/Header header
AppleQuality[] apples

解释:

3)在 CMakeLists.txtpackage.xml 中启用 rosidl(关键步骤)

apple_quality_msgs/package.xml(关键依赖片段)

<buildtool_depend>ament_cmake</buildtool_depend>

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>

<depend>std_msgs</depend>

解释:

apple_quality_msgs/CMakeLists.txt(最小可用片段)

cmake_minimum_required(VERSION 3.8)
project(apple_quality_msgs)

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/AppleQuality.msg"
  "msg/AppleQualityArray.msg"
  DEPENDENCIES std_msgs
)

ament_export_dependencies(rosidl_default_runtime)
ament_package()

解释:

构建与自检:

cd ~/ros2_ws
colcon build --packages-select apple_quality_msgs
source install/setup.bash
ros2 interface show apple_quality_msgs/msg/AppleQuality

本的检测节点输入是 sensor_msgs/msg/Image。但你们的“相机来源”可能有两种:

目标:把文件夹里的多张苹果真实照片,循环发布成 ROS2 Image 话题,例如 /real/camera/image_raw,让 YOLO 节点订阅它。

推荐话题命名(避免与仿真相机冲突):

实现方式(最小可跑通):在你们的 Python 包里增加一个“图片播放器节点”,每隔 publish_hz 秒读取下一张图片并发布。

示例代码(可复制,发布文件夹图片到 ROS2):

import glob
from pathlib import Path

import cv2

import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from sensor_msgs.msg import Image
from cv_bridge import CvBridge

class ImageFolderPublisher(Node):
    def __init__(self) -> None:
        super().__init__("image_folder_publisher")

        self.declare_parameter("image_dir", "")
        self.declare_parameter("image_topic", "/real/camera/image_raw")
        self.declare_parameter("publish_hz", 5.0)
        self.declare_parameter("loop", True)

        image_dir = self.get_parameter("image_dir").get_parameter_value().string_value
        image_topic = self.get_parameter("image_topic").get_parameter_value().string_value
        publish_hz = self.get_parameter("publish_hz").get_parameter_value().double_value
        loop = self.get_parameter("loop").get_parameter_value().bool_value

        if not image_dir:
            raise RuntimeError("parameter image_dir is empty")

        patterns = ["*.jpg", "*.jpeg", "*.png", "*.bmp", "*.webp"]
        files: list[str] = []
        for p in patterns:
            files.extend(glob.glob(str(Path(image_dir) / p)))
        files = sorted(files)

        if not files:
            raise RuntimeError(f"no image files found in {image_dir}")

        self.files = files
        self.loop = bool(loop)
        self.index = 0
        self.bridge = CvBridge()

        self.pub = self.create_publisher(Image, image_topic, qos_profile_sensor_data)
        self.timer = self.create_timer(1.0 / float(publish_hz), self.on_timer)

        self.get_logger().info(f"publishing {len(self.files)} images from {image_dir} to {image_topic} at {publish_hz}Hz")

    def on_timer(self) -> None:
        if self.index >= len(self.files):
            if not self.loop:
                self.get_logger().info("finished publishing (loop=false), shutting down")
                rclpy.shutdown()
                return
            self.index = 0

        path = self.files[self.index]
        self.index += 1

        bgr = cv2.imread(path, cv2.IMREAD_COLOR)
        if bgr is None:
            self.get_logger().warn(f"failed to read image: {path}")
            return

        msg = self.bridge.cv2_to_imgmsg(bgr, encoding="bgr8")
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = "real_camera"
        self.pub.publish(msg)

def main(args=None) -> None:
    rclpy.init(args=args)
    node = ImageFolderPublisher()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

逐段解释与自检:

  1. 将上述代码保存为:~/ros2_ws/src/apple_vision_yolov8/apple_vision_yolov8/image_folder_publisher_node.py
  2. apple_vision_yolov8/setup.pyentry_points 中增加一个可执行入口:
entry_points={
    "console_scripts": [
        "image_folder_publisher = apple_vision_yolov8.image_folder_publisher_node:main",
    ],
},

解释:

source /opt/ros/humble/setup.bash
source ~/venvs/yolo_ros2/bin/activate

cd ~/ros2_ws
colcon build --symlink-install --packages-select apple_vision_yolov8
source install/setup.bash

ros2 run apple_vision_yolov8 image_folder_publisher --ros-args \
  -p image_dir:=/path/to/apple_images_folder \
  -p image_topic:=/real/camera/image_raw \
  -p publish_hz:=5.0
ros2 topic info /real/camera/image_raw -v
ros2 topic echo /real/camera/image_raw --once

方案 B:Gazebo Fortress 相机作为输入(后续闭环用)

目标:让 Gazebo Fortress(Ignition)仿真相机发布图像,再通过桥接变成 ROS2 的 sensor_msgs/msg/Image,供 YOLO 节点订阅。

关键结论(你们需要提前知道):

你需要的系统要素(不展开细节,见《16 Gazebo Fortress ... 适配基础》):

本讲的最小闭环“先不做品质逻辑”,只做:

1)创建节点包(Python,示例:apple_vision_yolov8

cd ~/ros2_ws/src
ros2 pkg create apple_vision_yolov8 --build-type ament_python --dependencies rclpy sensor_msgs vision_msgs cv_bridge

解释与自检:

2)核心节点代码(可复制版本)

建议文件路径:

import time

import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray, Detection2D, BoundingBox2D, ObjectHypothesisWithPose

from cv_bridge import CvBridge

from ultralytics import YOLO

class Yolov8DetectorNode(Node):
    def __init__(self) -> None:
        super().__init__("yolov8_detector")

        self.declare_parameter("image_topic", "/real/camera/image_raw")
        self.declare_parameter("detections_topic", "/sorting/perception/yolo_detections")
        self.declare_parameter("model_path", "yolov8n.pt")
        self.declare_parameter("conf_thres", 0.5)
        self.declare_parameter("device", "cpu")

        image_topic = self.get_parameter("image_topic").get_parameter_value().string_value
        detections_topic = self.get_parameter("detections_topic").get_parameter_value().string_value
        model_path = self.get_parameter("model_path").get_parameter_value().string_value
        conf_thres = self.get_parameter("conf_thres").get_parameter_value().double_value
        device = self.get_parameter("device").get_parameter_value().string_value

        self.bridge = CvBridge()
        self.model = YOLO(model_path)
        self.device = str(device)
        self.conf_thres = float(conf_thres)

        self.pub = self.create_publisher(Detection2DArray, detections_topic, 10)
        self.sub = self.create_subscription(Image, image_topic, self.on_image, qos_profile_sensor_data)

        self.get_logger().info(
            f"yolov8_detector started. image_topic={image_topic} detections_topic={detections_topic} model={model_path} device={device}"
        )

    def on_image(self, msg: Image) -> None:
        t0 = time.perf_counter()

        frame_bgr = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
        result = self.model.predict(frame_bgr, verbose=False, device=self.device, conf=self.conf_thres)[0]

        det_arr = Detection2DArray()
        det_arr.header = msg.header

        names = result.names
        boxes = result.boxes

        if boxes is not None:
            for i in range(len(boxes)):
                b = boxes[i]
                score = float(b.conf.item())

                cls_id = int(b.cls.item())
                cls_name = str(names.get(cls_id, str(cls_id)))

                x1, y1, x2, y2 = [float(v) for v in b.xyxy[0].tolist()]
                cx = (x1 + x2) / 2.0
                cy = (y1 + y2) / 2.0
                sx = (x2 - x1)
                sy = (y2 - y1)

                det = Detection2D()
                det.header = msg.header
                det.id = str(i)

                det.bbox = BoundingBox2D()
                det.bbox.center.position.x = cx
                det.bbox.center.position.y = cy
                det.bbox.center.theta = 0.0
                det.bbox.size_x = sx
                det.bbox.size_y = sy

                hyp = ObjectHypothesisWithPose()
                hyp.hypothesis.class_id = cls_name
                hyp.hypothesis.score = score
                det.results.append(hyp)

                det_arr.detections.append(det)

        self.pub.publish(det_arr)

        dt_ms = (time.perf_counter() - t0) * 1000.0
        self.get_logger().info(f"published {len(det_arr.detections)} detections, latency={dt_ms:.1f}ms")

def main(args=None) -> None:
    rclpy.init(args=args)
    node = Yolov8DetectorNode()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

apple_vision_yolov8/setup.pyentry_points 添加:

entry_points={
    "console_scripts": [
        "yolov8_detector = apple_vision_yolov8.yolov8_detector_node:main",
    ],
},

解释:

source /opt/ros/humble/setup.bash
source ~/venvs/yolo_ros2/bin/activate

cd ~/ros2_ws
colcon build --symlink-install --packages-select apple_vision_yolov8
source install/setup.bash

ros2 run apple_vision_yolov8 yolov8_detector --ros-args -p image_topic:=/real/camera/image_raw

解释:

ros2 topic list | grep yolo
ros2 topic info /sorting/perception/yolo_detections -v
ros2 topic echo /sorting/perception/yolo_detections --once
ros2 topic hz /sorting/perception/yolo_detections

你应该能说清的“证据解释”:

  1. 复盘:我们已经发布了“检测框”,但分拣系统真正需要的是“可用的业务指标”。你认为哪些指标最能决定苹果的分级?

1)果径(diameter)的估计:必须说明“单位来源”

注意:

2)着色(red_ratio):把“红”定义成可计算的阈值

3)缺陷(defect_score):先给出“可落地”的最小规则

两种落地方式(本讲建议 A,B 作为扩展):

1)在节点包中增加对接口包的依赖

创建依赖前,先构建并 source 接口包(否则 Python 找不到消息类型):

cd ~/ros2_ws
colcon build --packages-select apple_quality_msgs
source install/setup.bash

然后在创建节点包时或事后,确保依赖中包含 apple_quality_msgs(package.xml 与 setup.py 的 install_requires/依赖声明要对齐,你们可按“工作空间规范课”的口径修改)。

2)品质后处理函数(可复制)

将下面函数放到 yolov8_detector_node.py 同文件中(或拆到 utils.py,推荐拆分以体现模块化思维)。

import cv2
import numpy as np

def compute_red_ratio(bgr_roi: np.ndarray) -> float:
    if bgr_roi.size == 0:
        return 0.0
    hsv = cv2.cvtColor(bgr_roi, cv2.COLOR_BGR2HSV)
    lower1 = np.array([0, 80, 50], dtype=np.uint8)
    upper1 = np.array([10, 255, 255], dtype=np.uint8)
    lower2 = np.array([170, 80, 50], dtype=np.uint8)
    upper2 = np.array([180, 255, 255], dtype=np.uint8)
    mask1 = cv2.inRange(hsv, lower1, upper1)
    mask2 = cv2.inRange(hsv, lower2, upper2)
    mask = cv2.bitwise_or(mask1, mask2)
    red_pixels = int(np.count_nonzero(mask))
    total_pixels = int(mask.size)
    return float(red_pixels) / float(total_pixels)

def estimate_diameter_mm(size_x_px: float, size_y_px: float, mm_per_px: float) -> float:
    diameter_px = max(float(size_x_px), float(size_y_px))
    return diameter_px * float(mm_per_px)

def grade_apple(diameter_mm: float, red_ratio: float, defect_score: float) -> str:
    if defect_score >= 0.6:
        return "C"
    if diameter_mm >= 80.0 and red_ratio >= 0.6:
        return "A"
    if diameter_mm >= 70.0 and red_ratio >= 0.4:
        return "B"
    return "C"

逐段解释与自检:

3)发布品质消息(可复制核心片段)

在节点中新增:

示例片段(展示关键逻辑,要求你们能把它集成进自己的节点并跑通):

from apple_quality_msgs.msg import AppleQuality, AppleQualityArray

# ... 在 __init__ 里新增参数与 publisher(示例)
# self.declare_parameter("quality_topic", "/sorting/perception/apple_quality")
# self.declare_parameter("mm_per_px", 0.2)
# self.quality_pub = self.create_publisher(AppleQualityArray, quality_topic, 10)

# ... 在 on_image() 里:在发布 det_arr 后,生成 quality_arr(示例)
quality_arr = AppleQualityArray()
quality_arr.header = msg.header

defect_best = {}
for det in det_arr.detections:
    if not det.results:
        continue
    cls = det.results[0].hypothesis.class_id
    score = float(det.results[0].hypothesis.score)
    if cls == "defect":
        defect_best[det.id] = max(defect_best.get(det.id, 0.0), score)

for det in det_arr.detections:
    if not det.results:
        continue
    cls = det.results[0].hypothesis.class_id
    if cls != "apple":
        continue

    cx = float(det.bbox.center.position.x)
    cy = float(det.bbox.center.position.y)
    sx = float(det.bbox.size_x)
    sy = float(det.bbox.size_y)

    x1 = int(max(0.0, cx - sx / 2.0))
    y1 = int(max(0.0, cy - sy / 2.0))
    x2 = int(min(frame_bgr.shape[1] - 1, cx + sx / 2.0))
    y2 = int(min(frame_bgr.shape[0] - 1, cy + sy / 2.0))
    roi = frame_bgr[y1:y2, x1:x2]

    red_ratio = compute_red_ratio(roi)
    diameter_mm = estimate_diameter_mm(sx, sy, self.mm_per_px)
    defect_score = float(defect_best.get(det.id, 0.0))
    grade = grade_apple(diameter_mm, red_ratio, defect_score)

    aq = AppleQuality()
    aq.header = msg.header
    aq.id = det.id
    aq.center_x = cx
    aq.center_y = cy
    aq.size_x = sx
    aq.size_y = sy
    aq.diameter_mm = float(diameter_mm)
    aq.red_ratio = float(red_ratio)
    aq.defect_score = float(defect_score)
    aq.grade = grade

    quality_arr.apples.append(aq)

self.quality_pub.publish(quality_arr)

逐段解释(你们需要能讲清的要点):

ros2 topic info /sorting/perception/apple_quality -v
ros2 topic echo /sorting/perception/apple_quality --once

你在说明里至少回答 3 点:

  1. 复盘:现在我们“能跑”了,但工程交付还缺什么?(提示:参数化、性能、鲁棒性、证据链)
你是 ROS2 Humble + Python 视觉节点开发专家。请基于以下约束,生成一个可运行的 YOLOv8 视觉检测节点(ament_python)。

输入:
- 订阅 sensor_msgs/msg/Image(参数 image_topic)

输出(必须同时发布 2 路话题):
- /sorting/perception/yolo_detections:vision_msgs/msg/Detection2DArray
- /sorting/perception/apple_quality:apple_quality_msgs/msg/AppleQualityArray(字段:center/size/diameter_mm/red_ratio/defect_score/grade)

工程约束:
1) ROS2:Humble;Python:rclpy;图像转换:cv_bridge;推理:ultralytics.YOLO
2) 参数化:model_path、conf_thres、device、mm_per_px、red_hsv_thresholds(建议可配置)
3) 性能:避免在回调中做长时间阻塞;给出日志打印每帧推理耗时
4) 验收:给出 colcon build、ros2 run、ros2 topic echo/hz 的命令与期望现象
5) 代码要包含必要注释,并说明每个参数的作用

解释:

2)审计清单(必须写进作业的“审计优化痕迹”)

  1. 参数化阈值:把 grade_apple 的阈值(80/70、0.6/0.4、defect 0.6)做成 ROS2 参数,运行时可调。
  2. 降低阻塞:将推理从订阅回调移到定时器(例如保存最新帧、按固定频率推理),避免相机高帧率时回调堆积。
  3. 日志降噪:每 N 帧打印一次 latency;或在 latency 超过阈值时才 warn。

回归测试口径(每次改完必须跑):

ros2 topic echo /sorting/perception/yolo_detections --once
ros2 topic echo /sorting/perception/apple_quality --once
ros2 topic hz /sorting/perception/yolo_detections

你需要能解释的“回归结论”:

收束到工程职业素养(本讲统一口径):

  1. ModuleNotFoundError: ultralytics
  1. CvBridgeError: ... encoding
  1. 话题存在但 echo 没有数据
  1. 推理太慢导致 hz 很低

参考与延伸(建议课后自学)